
### Extract snapshots of financial distress measures at various geographic levels

# Load packages
import calendar
import os
import sys
import shutil
import subprocess
import pandas as pd
import numpy  as np
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools import *




# spark = SparkContext()
# spark.setLogLevel("ERROR")
# sqlContext = SQLContext(spark) 


# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'

#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)

#########################
###       Main        ###
#########################

#snapshot
yList = range(2000,2017)
# m     = 9
m     = 6


#level: 
for j in range(0,4):
    if j==0:
        GEO='cz'
    elif j==1:
        GEO='st'
    elif j==2:
        GEO='zip'
    else:
        GEO='indi'

    # for each snapshot
    for i in range(0, len(yList)):

        y  = yList[i]
        ym = yyyymm(y,m)


        #post-2008, we restrict to those with valid birth dates and are between 20 and 80 of age
        if y>2008:
            #filter for those with valid birth dates and are between 20 and 80 of age
            headersData = table_setup(sqlContext, y, m, 'headers')
            headersData.createOrReplaceTempView("headersData")

            sql2run = "SELECT subjectKey FROM headersData WHERE birthDate IS NOT NULL AND FLOOR((asOfDate - birthDate)/10000) BETWEEN 20 AND 80 "
            sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'age_filter'))

            aggSQL = data_snapshot(sqlContext, WORKDIR, y, m, outTab='merge_temp', filterTab='age_filter')
        else:
            aggSQL = data_snapshot(sqlContext, WORKDIR, y, m, outTab='merge_temp')


        
        temp   = 'parquet.`' + os.path.join(WORKDIR, 'merge_temp') + '`'

        #### collapse at cz level
        if GEO=='cz':
            sql2run = "SELECT a.asOfDate, c.cz, c.czname, c.statefip, c.stateabbr, " + aggSQL + \
                  "FROM " + temp + " a LEFT JOIN " + CW + " c " + \
                  "ON a.zipcode = c.zip " + \
                  "GROUP BY a.asOfDate, c.cz, c.czname, c.statefip, c.stateabbr"
        #### collapse at state level
        elif GEO=='st':
            sql2run = "SELECT a.asOfDate, c.statefip AS st, c.stateabbr, " + aggSQL + \
                  "FROM " + temp + " a LEFT JOIN " + CW + " c " + \
                  "ON a.zipcode = c.zip " + \
                  "GROUP BY a.asOfDate, c.statefip, c.stateabbr"
        #### collapse at zipcode level
        elif GEO=='zip':
            sql2run = "SELECT a.asOfDate, a.zipcode, c.cz, c.czname, c.statefip, c.stateabbr, " + aggSQL + \
                  "FROM " + temp + " a LEFT JOIN " + CW + " c " + \
                  "ON a.zipcode = c.zip " + \
                  "GROUP BY a.asOfDate, a.zipcode, c.cz, c.czname, c.statefip, c.stateabbr"
        #### individual level -- no collapse
        elif GEO=='indi':
            sql2run = "SELECT a.subjectKey, a.asOfDate, a.zipcode, c.cz, c.czname, c.statefip, c.stateabbr, " + aggSQL + \
                  "FROM " + temp + " a LEFT JOIN " + CW + " c " + \
                  "ON a.zipcode = c.zip " + \
                  "GROUP BY a.asOfDate, a.zipcode, c.cz, c.czname, c.statefip, c.stateabbr, a.subjectKey"           
        else:
            print('only allow for cz, st, zip or indi level now \\n')
        sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'sumstats_' + ym))

        # clean up output
        #remove auxiliary spark files, and rename output file
        cleanOutput('sumstats_' + ym + '_' + GEO, 'sumstats_' + ym, OUTDIR, WORKDIR, sqlContext)

    sqlContext.clearCache()
os.chdir(CODEDIR)
print(' !!!! SUCCESS !!!!')